Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-12297][SQL] Hive compatibility for Parquet Timestamps #16781

Closed
wants to merge 52 commits into from

Conversation

squito
Copy link
Contributor

@squito squito commented Feb 2, 2017

What changes were proposed in this pull request?

This change allows timestamps in parquet-based hive table to behave as a "floating time", without a timezone, as timestamps are for other file formats. If the storage timezone is the same as the session timezone, this conversion is a no-op. When data is read from a hive table, the table property is always respected. This allows spark to not change behavior when reading old data, but read newly written data correctly (whatever the source of the data is).

Spark inherited the original behavior from Hive, but Hive is also updating behavior to use the same scheme in HIVE-12767 / HIVE-16231.

The default for Spark remains unchanged; created tables do not include the new table property.

This will only apply to hive tables; nothing is added to parquet metadata to indicate the timezone, so data that is read or written directly from parquet files will never have any conversions applied.

How was this patch tested?

Added a unit test which creates tables, reads and writes data, under a variety of permutations (different storage timezones, different session timezones, vectorized reading on and off).

@SparkQA
Copy link

SparkQA commented Feb 2, 2017

Test build #72287 has finished for PR 16781 at commit 223ce2c.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 2, 2017

Test build #72288 has finished for PR 16781 at commit 5b49ae0.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@squito squito closed this Feb 2, 2017
@squito squito reopened this Mar 6, 2017
@SparkQA
Copy link

SparkQA commented Mar 6, 2017

Test build #74031 has finished for PR 16781 at commit 2c8a228.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 7, 2017

Test build #74042 has finished for PR 16781 at commit f0b89fd.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zivanfi
Copy link

zivanfi commented Mar 7, 2017

Please update the pull request description, because the one dated Feb 2 does not correspond to the fix any more.

// The conf is sometimes null in tests.
String tzString =
conf == null ? null : conf.get(ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY());
if (tzString == null || tzString == "") {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is java code, not scala, you probably meant tzString.equals("") instead of tzString == ""

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or even better, isEmpty().

@@ -674,6 +674,12 @@ object SQLConf {
.stringConf
.createWithDefault(TimeZone.getDefault().getID())

val PARQUET_TABLE_INCLUDE_TIMEZONE =
buildConf("spark.sql.session.parquet.timeZone")
.doc("""Enables inclusion of parquet timezone property in newly created parquet tables""")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There should be a config option for writing "UTC" to the table property when creating tables, not for writing the local timezone.

testParquetHiveCompatibility(
Row(Seq(Row(1))),
"ARRAY<STRUCT<array_element: INT>>")
}

test(s"SPARK-12297: Parquet Timestamp & Hive timezone") {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better to have separate test cases for adjustments when reading, adjustments when writing and setting the table property when creating tables.

@squito
Copy link
Contributor Author

squito commented Apr 18, 2017

@ueshin thanks for taking a look. Yes, that understanding is correct. Another way to think about it is to compare those same operations with different file formats, eg. textfile. Those work more like parquet does after this patch. I had that explanation in a comment on the jira -- I just updated the jira description to include it.

I'll address your comments, they also are making me take a closer look at a couple of things. I should push an update tomorrow.

@squito
Copy link
Contributor Author

squito commented Apr 20, 2017

@ueshin I've pushed an update which addresses your comments. I also realized that partitioned tables weren't handled correctly! I fixed that as well.

)
Seq(false, true).foreach { vectorized =>
withClue(s"vectorized = $vectorized;") {
spark.conf.set(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key, vectorized)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was initially using SQLTestUtils.withSQLConf, but I discovered that it wasn't actually taking any effect. I dunno if that is because TestHiveSingleton does something strange, or maybe I'm doing something else weird in this test by creating many new spark sessions. But I did that because it was the only way I could get the conf changes applied consistently.

Since I am creating new sessions, I don't think this has any risk of a failed test not cleaning and triggering failures in other tests outside of this suite. But it still seems like I might be doing something wrong ...

@SparkQA
Copy link

SparkQA commented Apr 20, 2017

Test build #75966 has finished for PR 16781 at commit 44a8bbb.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@ueshin ueshin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@squito Thank you for working on this!
I checked updates and added some comments.

Btw, can you fix the partitioned tables?

// hadoopConf for the Parquet Converters
val storageTzKey = ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY
val storageTz = relation.tableMeta.properties.getOrElse(storageTzKey, "")
val sessionTz = sparkSession.sessionState.conf.sessionLocalTimeZone
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sessionTz isn't used.

val sessionTz = sparkSession.sessionState.conf.sessionLocalTimeZone
Map(
storageTzKey -> storageTz
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should return Map.empty if the value isn't included to the table properties?

val schema = StructType(Seq(
StructField("display", StringType, true)
))
val df = spark.createDataFrame(rowRdd, schema)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use val df = desiredTimestampStrings.toDF("display") after import spark.implicits._.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, appreciate the help simplifying this. I had a feeling it was more complex than it needed to be :)

// is for various "wall-clock" times in different timezones, and then we can compare against those
// in our tests.
val originalTz = TimeZone.getDefault
val timestampTimezoneToMillis = try {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we initialize this in the block like this?

val timestampTimezoneToMillis = {
  val originalTz = TimeZone.getDefault
  try {
    ...
  } finally {
    TimeZone.setDefault(originalTz)
  }
}

"UTC" -> "UTC",
"LA" -> "America/Los_Angeles",
"Berlin" -> "Europe/Berlin"
).foreach { case (tableName, zone) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be testTimezones.foreach { ...?

@@ -42,6 +52,15 @@ class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with TestHi
""".stripMargin)
}

override def afterEach(): Unit = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was probably just being a little paranoid, perhaps I had missed a withTable somewhere. In the current code, things work just fine if I remove them.

* the hour.
* @param storageTz the timezone which was used to store the timestamp. This should come from the
* timestamp table property, or else assume its the same as the sessionTz
* @return
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also add descriptions for @param binary and @return?

}

private def checkHasTz(table: String, tz: Option[String]): Unit = {
val tableMetadata = spark.sessionState.catalog.getTableMetadata(TableIdentifier(table))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should explicitly pass sparkSession and use it here?

val key = ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY
withTable(baseTable, s"like_$baseTable", s"select_$baseTable") {
val localTz = TimeZone.getDefault()
val localTzId = localTz.getID()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

localTz and localTzId aren't used.

spark.sql(
raw"""ALTER TABLE $baseTable SET TBLPROPERTIES ($key="America/Los_Angeles")""")
checkHasTz(baseTable, Some("America/Los_Angeles"))
spark.sql( raw"""ALTER TABLE $baseTable SET TBLPROPERTIES ($key="UTC")""")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove extra white space, and two more below this.

@squito
Copy link
Contributor Author

squito commented Apr 25, 2017

@ueshin updated per your feedback.

I should have explained that the last update did handle partition tables (it added the second call to getStorageTzOptions in HiveMetastoreCatalog), though I didn't have any tests for it. It took me a while to figure out how to do it, but this update does include tests for creating partitioned tables and reading from them. (the tests are becoming huge, but I think its worth testing all of the permutations.)

@SparkQA
Copy link

SparkQA commented Apr 25, 2017

Test build #76141 has finished for PR 16781 at commit e31657a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.


class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with TestHiveSingleton {
class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with TestHiveSingleton
with BeforeAndAfterEach {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need BeforeAndAfterEach anymore.

| display string,
| ts timestamp
|)
|PARTITIONED BY (id bigint)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should test for the partitioned table like PARTITIONED BY (ts timestamp)?

@squito
Copy link
Contributor Author

squito commented May 2, 2017

@ueshin sorry it took me a while to figure out how a table partitioned by timestamps work (I didn't even realize that was possible, I don't think it is in hive?) and I was traveling.

The good news is that partitioning by timestamp works just fine. Since the ts is stored as a string anyway, and converted using the session tz already, it already works. I added one minimal test on this -- when the partitioned table is written, the correct partition dirs are created regardless of the timezone combinations.

In particular, it doesn't make sense to do tests like the existing ones, where we write or read "unadjusted" data, bypassing the hive tables, and then make sure the right adjustments are applied when you perform the reverse action via the hive table; the partition values are correct whether you use the hive table & adjustment property or not.

Let me know if you think more tests are required.

@SparkQA
Copy link

SparkQA commented May 2, 2017

Test build #76391 has finished for PR 16781 at commit fc17a2e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with TestHiveSingleton

Copy link
Member

@ueshin ueshin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@squito Thank you for working on this!
The behavior looks good for me.
I left some minor comments.
Thanks!

import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
import org.scalatest.BeforeAndAfterEach
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: unnecessary import.

val defaultTz = None
// check that created tables have correct TBLPROPERTIES
val tblProperties = explicitTz.map {
tz => raw"""TBLPROPERTIES ($key="$tz")"""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use s"" instead of raw"" if possible. And also elsewhere in the same way.

val timestampTimezoneToMillis = {
val originalTz = TimeZone.getDefault
try {
(for {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use flatMap { .. map { ... } }.

@@ -29,6 +29,8 @@ import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.internal.SQLConf
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: unnecessary import.

tz => raw"""TBLPROPERTIES ($key="$tz")"""
}.getOrElse("")


Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove extra line.

baseTable: String,
explicitTz: Option[String],
sessionTzOpt: Option[String]): Unit = {
val key = ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: indent

@SparkQA
Copy link

SparkQA commented May 3, 2017

Test build #76419 has finished for PR 16781 at commit 2537437.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ueshin
Copy link
Member

ueshin commented May 8, 2017

Jenkins, retest this please.

@ueshin
Copy link
Member

ueshin commented May 8, 2017

LGTM, pending Jenkins.

@SparkQA
Copy link

SparkQA commented May 8, 2017

Test build #76556 has finished for PR 16781 at commit 2537437.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ueshin
Copy link
Member

ueshin commented May 8, 2017

Thanks! Merging to master.

@asfgit asfgit closed this in 2269155 May 8, 2017
@squito
Copy link
Contributor Author

squito commented May 8, 2017

great! thanks @ueshin

@rxin
Copy link
Contributor

rxin commented May 9, 2017

Did we conduct any performance tests on this patch?

liyichao pushed a commit to liyichao/spark that referenced this pull request May 24, 2017
## What changes were proposed in this pull request?

This change allows timestamps in parquet-based hive table to behave as a "floating time", without a timezone, as timestamps are for other file formats.  If the storage timezone is the same as the session timezone, this conversion is a no-op.  When data is read from a hive table, the table property is *always* respected.  This allows spark to not change behavior when reading old data, but read newly written data correctly (whatever the source of the data is).

Spark inherited the original behavior from Hive, but Hive is also updating behavior to use the same  scheme in HIVE-12767 / HIVE-16231.

The default for Spark remains unchanged; created tables do not include the new table property.

This will only apply to hive tables; nothing is added to parquet metadata to indicate the timezone, so data that is read or written directly from parquet files will never have any conversions applied.

## How was this patch tested?

Added a unit test which creates tables, reads and writes data, under a variety of permutations (different storage timezones, different session timezones, vectorized reading on and off).

Author: Imran Rashid <[email protected]>

Closes apache#16781 from squito/SPARK-12297.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants